package com.elasticsearch.util;

import java.io.IOException;
import java.lang.reflect.Constructor;

import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;

public class ClientUtil
{

    static Settings defaultSettings = ImmutableSettings.settingsBuilder().put("client.transport.sniff", false).put("client.transport.ping_timeout","10s").build(); //如果你的集群node数量是稳定的，那么最好关闭sniff。 同时， 将ping时间设置高于默认5s, 很大程序上可以解决No Node available exception.
 
    // 创建私有对象
    private static TransportClient targetClient;
    
    private static TransportClient sourceClient;
 
    static {
        try {
            Class<?> clazz = Class.forName(TransportClient.class.getName());
            Constructor<?> constructor = clazz.getDeclaredConstructor(Settings.class);
            constructor.setAccessible(true);
		    Settings finalSettings = ImmutableSettings.settingsBuilder()
		                .put(defaultSettings)
		                .build();
		    targetClient = (TransportClient) constructor.newInstance(finalSettings);
		    targetClient.addTransportAddress(new InetSocketTransportAddress("192.168.1.100", 9300))
		    			.addTransportAddress(new InetSocketTransportAddress("192.168.1.101", 9300));
		    sourceClient = (TransportClient) constructor.newInstance(finalSettings);
		    sourceClient.addTransportAddress(new InetSocketTransportAddress("192.168.1.110", 9300))
		                .addTransportAddress(new InetSocketTransportAddress("192.168.1.111", 9300));
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }
 
    // 取得源实例
    public static synchronized Client getSourceTransportClient() {
        return sourceClient;
    }
    
    // 取得目标实例
    public static synchronized Client getTargetTransportClient() {
        return targetClient;
    }
    
    
    
    public static void main(String[] args) throws  IOException, InterruptedException
	{
		int pageSize = 40; //分页大小， 不能过大， 太大影响集群性能， 可能引起no node 异常。
		Client sourceclient = ClientUtil.getSourceTransportClient();
		Client targetclient = ClientUtil.getTargetTransportClient();
	        //调用doMigrate方法。 doMigrate(sourceclient, targetclient, "test", "testnew", "test", pageSize);
		}
    
    
//    private void doMigrate(Client sourceclient, Client targetclient, String sourceIndexName, String targetIndexName, String indexDocType, int pageSize)
//	{
//		int total = 0;
//		SearchResponse searchResponse = sourceclient.prepareSearch(sourceIndexName).setSearchType(SearchType.SCAN)
//				.setQuery(matchAllQuery()).setSize(pageSize).setScroll(TimeValue.timeValueSeconds(20)).execute()
//				.actionGet(); //scroll 的time不能太大， 以免对集群造成负载
//		boolean exists = targetclient.admin().indices().prepareExists(targetIndexName).execute().actionGet().isExists();
//		if (!exists)
//			targetclient
//					.admin()
//					.indices()
//					.prepareCreate(targetIndexName)
//					.setSettings(
//							settingsBuilder().put("index.number_of_replicas", 0).put("index.refresh_interval", "-1"))
//					.execute().actionGet(); //设置replica为0， 不refresh， 为了提高索引速度。
//		try
//		{
//			Thread.sleep(200);
//		} catch (InterruptedException e)
//		{
//			e.printStackTrace();
//		}
//		BulkProcessor bulkProcessor = BulkProcessor.builder(targetclient, new BulkProcessor.Listener()
//		{
//
//			@Override
//			public void beforeBulk(long executionId, BulkRequest request)
//			{
//
//			}
//
//			@Override
//			public void afterBulk(long executionId, BulkRequest request, BulkResponse response)
//			{
//				if (response.hasFailures())
//				{
//					throw new RuntimeException("BulkResponse show failures: " + response.buildFailureMessage());
//				}
//			}
//
//			@Override
//			public void afterBulk(long executionId, BulkRequest request, Throwable failure)
//			{
//				throw new RuntimeException("Caught exception in bulk: " + request + ", failure: " + failure, failure);
//			}
//		}).setConcurrentRequests(10).build(); //设置线程数量， 大小可以根据自己机器调配。
//
//		while (true)
//		{
//			searchResponse = sourceclient.prepareSearchScroll(searchResponse.getScrollId())
//					.setScroll(TimeValue.timeValueSeconds(20)).execute().actionGet();
//			for (SearchHit hit : searchResponse.getHits())
//			{
//
//				IndexRequestBuilder indexRequestBuilder = targetclient.prepareIndex(targetIndexName, indexDocType);
//				indexRequestBuilder.setSource(hit.getSource());
//				indexRequestBuilder.setId(hit.getId());
//				indexRequestBuilder.setOpType(IndexRequest.OpType.INDEX);
//				bulkProcessor.add(indexRequestBuilder.request());
//				total++;
//			}
//			System.out.println("Already migrated : " + total + " records!");
//			if (searchResponse.getHits().hits().length == 0)
//			{
//				break;
//			}
//		}
//		try
//		{
//			Thread.sleep(10000);//Sleep 10s waiting the cluster.
//		} catch (InterruptedException e)
//		{
//			e.printStackTrace();
//		}
//		bulkProcessor.close();
//		targetclient
//		.admin()
//		.indices().prepareUpdateSettings(targetIndexName).setSettings(
//				settingsBuilder().put("index.number_of_replicas", 1).put("index.refresh_interval", "1s"))
//		.execute().actionGet();
//	}



}
